-
Notifications
You must be signed in to change notification settings - Fork 1.1k
chore: disk backpressure class utility #6020
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Kostas Kyrimis <[email protected]>
| } | ||
|
|
||
| DiskBackedBackpressureQueue::~DiskBackedBackpressureQueue() { | ||
| auto ec = writer_->Close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: best practices suggest separating between i/o operations and d'tor functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% if you recall we had issues around ProtocolClient destructor recently because of that 😄
|
|
||
| #pragma once | ||
|
|
||
| #include <util/fibers/uring_file.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you do not need uring or linux specific api for this, imho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 3 API's:
- Uring (uring_file.h) -> asynchronous fiber blocking
- Linux (file.h) ->
thread blockingvia writev/preadv - C++ files (fstream) - >
thread blocking
Why would we block the entire thread on a disk write ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to use io/file.h interface, you do not need to leak the implementation in h file.
For epoll we can use helio/util/fibers/fiber_file.h though it's not a priority and it's fine to assume this feature is only for iouring enabled systems but in any case you will have to write code in such way that it will build on macos (so better not to leak iouring in header files)
|
|
||
| namespace facade { | ||
|
|
||
| class DiskBackedBackpressureQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's rename it to DiskBackedQueue and the file should be renamed as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds disk-backed backpressure support for connections in DragonflyDB. When enabled via the disk_backpressure_offload_watermark flag, connections can offload backpressure data to disk instead of holding everything in memory, helping prevent out-of-memory conditions during high load scenarios.
- Introduces
DiskBackedBackpressureQueueclass for managing disk-backed queue operations - Adds configuration flags for controlling disk backpressure behavior (folder location, file size limits, load batch size)
- Integrates disk backpressure support into the Connection class infrastructure
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| src/facade/dragonfly_connection.h | Adds forward declaration for DiskBackedBackpressureQueue and member variables for disk backpressure support |
| src/facade/dragonfly_connection.cc | Integrates disk backpressure initialization into Connection constructor with new flag and error handling |
| src/facade/disk_connection_backpressure.h | Defines new DiskBackedBackpressureQueue class interface for disk-backed backpressure management |
| src/facade/disk_connection_backpressure.cc | Implements disk-backed queue operations including file I/O, offloading, and loading from disk |
| src/facade/CMakeLists.txt | Adds new disk_connection_backpressure.cc source file to build configuration |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // For each item loaded from disk it calls f(item) to consume it. | ||
| // Reads up to max_queue_load_size_ items on each call | ||
| void LoadFromBacking(std::function<void(io::MutableBytes)> f); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really dislike such interfaces that internalize action. What if the connection wants to stop based on execution time, some other state conditions, shutdown, etc. It's not the queues job to manage this at all
string_view/size_t Read(std::string* buffer) whould be enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to that
| auto res = writer_->Write(blob); | ||
| if (res) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
res looks like a Result to me, but not like an ec -> error_code 🙂
| } | ||
|
|
||
| size_t DiskBackedBackpressureQueue::TotalInMemoryBytes() const { | ||
| return offsets_.size() * sizeof(ItemOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sadly there's really no way to get the capacity of a dequeue
Signed-off-by: Kostas Kyrimis <[email protected]>
src/facade/disk_backed_queue.cc
Outdated
|
|
||
| next_read_offset_ += bytes.size(); | ||
|
|
||
| if (total_backing_items_ > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we push and pop at the same time? The variable might have changed
src/facade/disk_backed_queue.cc
Outdated
| std::error_code DiskBackedQueue::Pop(std::string* out) { | ||
| // We read the next item and (if there are more) we also prefetch the next item's size. | ||
| uint32_t read_sz = next_item_total_bytes_ + (total_backing_items_ > 1 ? sizeof(uint32_t) : 0); | ||
| buffer.resize(read_sz); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that you use buffer here, parallel pull and pop are not allowed. But either way, can't we use out directly to read into?
romange
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am holding on with the review, as this class will change (and will become simpler) if we just write raw socket data instead of parsed messages
| std::unique_ptr<io::ReadonlyFile> reader_; | ||
|
|
||
| size_t total_backing_bytes_ = 0; | ||
| size_t total_backing_items_ = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
total_backing_items_ is redundant now
|
|
||
| std::error_code Push(std::string_view blob); | ||
|
|
||
| std::error_code Pop(std::string* out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not a good interface now - as you do not pop an element. You will usually have a destination buffer which you will want to fill till the end of its capacity or less. Similarly to socket Recv() interface.
| // Check if backing file is empty, i.e. backing file has 0 bytes. | ||
| bool Empty() const; | ||
|
|
||
| std::error_code CloseReader(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to control Reader and Writer independently?
Seems that if Empty(), you just want to shut the queue altogether
|
I think there is still work that needs to be done. In any case, I suggest checking how it will fit once you integreate |
This PR adds a simple producer/consumer utility class that for a backing file F can:
This class will be used by dragonfly connection to offload commands when dispatch queue surpasses a given value.
No functional change in dragonfly_connection -- I just added the initialization paths and the flags needed.
Resolves #6029